昨天聊到了 TP 告知 main Thread 任務完成的方法。今天說說 TP 本身在運行甚麼。
原始碼來自以下網址 :
static void init_threads(void) {
  unsigned int i;
  const char* val;
  uv_sem_t sem;
  nthreads = ARRAY_SIZE(default_threads);
  val = getenv("UV_THREADPOOL_SIZE");
  if (val != NULL)
    nthreads = atoi(val);
  if (nthreads == 0)
    nthreads = 1;
  if (nthreads > MAX_THREADPOOL_SIZE)
    nthreads = MAX_THREADPOOL_SIZE;
  threads = default_threads;
  if (nthreads > ARRAY_SIZE(default_threads)) {
    threads = uv__malloc(nthreads * sizeof(threads[0]));
    if (threads == NULL) {
      nthreads = ARRAY_SIZE(default_threads);
      threads = default_threads;
    }
  }
  if (uv_cond_init(&cond))
    abort();
  if (uv_mutex_init(&mutex))
    abort();
  QUEUE_INIT(&wq);
  QUEUE_INIT(&slow_io_pending_wq);
  QUEUE_INIT(&run_slow_work_message);
  if (uv_sem_init(&sem, 0))
    abort();
  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
      abort();
  for (i = 0; i < nthreads; i++)
    uv_sem_wait(&sem);
  uv_sem_destroy(&sem);
}
不用細讀, 原則上就是 TP 多大就創建多少 thread , 並且 thread 運行 worker 方法
查看 worker ( 這段不特別討論, 其牽涉到不同 case IO 處理的機制。)
static void worker(void* arg) {
  struct uv__work* w;
  QUEUE* q;
  int is_slow_work;
  uv_sem_post((uv_sem_t*) arg);
  arg = NULL;
	// 任務隊列 1 次只能有一個 thread 訪問
  uv_mutex_lock(&mutex);
  for (;;) {
    /* `mutex` should always be locked at this point. */
    /* Keep waiting while either no work is present or only slow I/O
       and we're at the threshold for that. */
    while (QUEUE_EMPTY(&wq) ||
           (QUEUE_HEAD(&wq) == &run_slow_work_message &&
            QUEUE_NEXT(&run_slow_work_message) == &wq &&
            slow_io_work_running >= slow_work_thread_threshold())) {
      idle_threads += 1;
      uv_cond_wait(&cond, &mutex);
      idle_threads -= 1;
    }
    q = QUEUE_HEAD(&wq);
    if (q == &exit_message) {
      uv_cond_signal(&cond);
      uv_mutex_unlock(&mutex);
      break;
    }
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is executing. */
    is_slow_work = 0;
    if (q == &run_slow_work_message) {
      /* If we're at the slow I/O threshold, re-schedule until after all
         other work in the queue is done. */
      if (slow_io_work_running >= slow_work_thread_threshold()) {
        QUEUE_INSERT_TAIL(&wq, q);
        continue;
      }
      /* If we encountered a request to run slow I/O work but there is none
         to run, that means it's cancelled => Start over. */
      if (QUEUE_EMPTY(&slow_io_pending_wq))
        continue;
      is_slow_work = 1;
      slow_io_work_running++;
      q = QUEUE_HEAD(&slow_io_pending_wq);
      QUEUE_REMOVE(q);
      QUEUE_INIT(q);
      /* If there is more slow I/O work, schedule it to be run as well. */
      if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
        QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
        if (idle_threads > 0)
          uv_cond_signal(&cond);
      }
    }
    uv_mutex_unlock(&mutex);
    w = QUEUE_DATA(q, struct uv__work, wq);
    w->work(w);
    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL;  /* Signal uv_cancel() that the work req is done
                        executing. */
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);
    /* Lock `mutex` since that is expected at the start of the next
     * iteration. */
    uv_mutex_lock(&mutex);
    if (is_slow_work) {
      /* `slow_io_work_running` is protected by `mutex`. */
      slow_io_work_running--;
    }
  }
}
其任務就是遍歷 TP 中的 task , 一個一個地完成, 當全部都完成再讓 thread 進入休眠。
此外, 該資料結構也有方法給 TP 加入任務。
Node 的 TP 會維護多個 queue , 用來存放不同 case 的任務 , 並且提供方法使 C++ 層可以把非同步 IO 任務加入這些 queue 中, 此外 TP 本身由多個 thread 組成, 會不斷地從這些 queue 中取出任務, 並且執行該任務。當任務完成, 用 uv_async_send 設定 async_sent 為 1 , 表示任務完成, 並且加入到 wq 中。所以此處 wq 是一個完成 IO 的任務的放置區。
回到源頭, 查看在 pending 階段中被調用的 async_cb 是什麼 ?
明天見 !